package com.tanx.cqrs.infrastructure.spring.event;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tanx.cqrs.event.Event;
import com.tanx.cqrs.event.EventBus;
import com.tanx.cqrs.event.handler.EventHandlerResolver;
import com.tanx.cqrs.event.store.EventStoreRepository;
import com.tanx.cqrs.infrastructure.spring.event.store.EventStoreRepositoryFactory;
import com.tanx.cqrs.saga.Saga;
import com.tanx.cqrs.saga.SagaHandlerResolver;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

/**
 * 事件总线模板类
 */
@Slf4j
public abstract class AbstractEventBus implements EventBus {

    private EventHandlerResolver resolver;
    private SagaHandlerResolver sagaHandlerResolver;
    private EventStoreRepositoryFactory factory;

    public AbstractEventBus(EventHandlerResolver resolver, SagaHandlerResolver sagaHandlerResolver, EventStoreRepositoryFactory factory) {
        this.resolver = resolver;
        this.sagaHandlerResolver = sagaHandlerResolver;
        this.factory = factory;
    }

    /**
     * 消费消息
     *
     * @param event 消息体
     */
    protected void consumeEvent(EventProxy event) {
        if (event instanceof DealtEventProxy) {
            return;
        }
        boolean b = saveEvent(event.getEvent());
        if (b) {
            invokeHandler(event.getEvent());
        }
        eventFinish(event.getEvent());
    }

    @Override
    public void invokeHandler(Event event) {
        resolver.handlerEvent(event);
        sagaHandlerResolver.handlerEvent(event);
    }

    @Override
    public void eventFinish(Event event) {
        EventStoreRepository storeRepository = factory.getStoreRepository(event.getClass());
        storeRepository.finishEvent(event.getUuid());
    }

    private boolean saveEvent(Event event) {
        EventStoreRepository storeRepository = factory.getStoreRepository(event.getClass());
        Event existEvent = storeRepository.findByEventId(event.getUuid());
        if (existEvent != null) {
            log.warn("已经存在该事件,事件{}将被忽略", event);
            return false;
        }
        boolean save = storeRepository.save(event);
        if (!save) {
            throw new RuntimeException("保存事件失败,事件为:" + event.toString());
        }
        return true;
    }

    @Override
    public <T> void sendEvent(Event event) {
        EventProxy eventProxy = new EventProxy(event, getSenderClass());
        sendEvent(eventProxy);
    }

    private Class<?> getSenderClass() {
        Throwable throwable = new Throwable();
        StackTraceElement[] ste = throwable.getStackTrace();
        for (StackTraceElement element : ste) {
            String className = element.getClassName();
            if (!className.startsWith("com.tanx.cqrs.infrastructure.spring.event")) {
                try {
                    return Class.forName(className);
                } catch (ClassNotFoundException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        throw new RuntimeException("未找到sender");
    }

    @Override
    public void sendSagaEvent(Event event, Saga saga) {
        EventProxy eventProxy = new EventProxy(event, saga.getClass());
        eventProxy.setSagaId(saga.getUuid());
        sendEvent(eventProxy);
    }

    /**
     * 真正的发送方法
     *
     * @param eventProxy 真正的发送消息体
     */
    protected abstract void sendEvent(EventProxy eventProxy);


    /**
     * 将event转换为byte
     *
     * @param event 目标event
     * @return 转换后的byte数组
     */
    public byte[] getEventByte(Object event) {
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.writeValueAsBytes(event);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 解析mq发送的字节码
     *
     * @param body 发送的内容
     * @return 代理消息对象
     */
    public EventProxy parseEventFormByte(byte[] body) {
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.readValue(body, EventProxy.class);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
